package com.chatplus.application.web.notification;

import com.chatplus.application.common.logging.SouthernQuietLogger;
import com.chatplus.application.common.logging.SouthernQuietLoggerFactory;
import com.chatplus.application.common.util.PlusJsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Component
public class NotificationPublisher {

    private static final SouthernQuietLogger LOGGER = SouthernQuietLoggerFactory.getLogger(NotificationPublisher.class);

    private final StringRedisTemplate stringRedisTemplate;

    public NotificationPublisher(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public void publish(Object notification, String streamKey, String messageId) {
        RecordId recordId = stringRedisTemplate.opsForStream()
                .add(StreamRecords.newRecord()
                        .in(streamKey)
                        .ofObject(PlusJsonUtils.toJsonString(notification))
                        .withId(RecordId.of(messageId)));
        LOGGER.message("发布通知").context("streamKey", streamKey).context("messageId", messageId).context("recordId", recordId).info();

    }

    public void publish(Object notification, String messageId) {
        String streamKey = getNotificationSource(notification.getClass());
        publish(notification, streamKey, messageId);
    }

    public void publish(Object notification) {
        publish(notification, RecordId.autoGenerate().getValue());
    }

    public static String getNotificationSource(Class<?> cls) {
        MessageSource annotation = AnnotationUtils.getAnnotation(cls, MessageSource.class);
        return null == annotation || StringUtils.isEmpty(annotation.source()) ? cls.getSimpleName() : annotation.source();
    }

    /**
     * 本地事务提交后发布通知
     *
     * @param notification 通知内容
     */
    public void publishAfterCommit(Object notification) {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
                @Override
                public void afterCommit() {
                    publish(notification);
                }
            });
        } else {
            publish(notification);
        }
    }
}
